/*
 * Copyright 2019-2021 the original author or authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cool.houge.ws;

import cool.houge.ws.packet.ErrorPacket;
import cool.houge.ws.packet.Packet;
import cool.houge.ws.session.Session;
import cool.houge.ws.session.SessionGroupManager;
import cool.houge.ws.session.SessionManager;
import io.avaje.jsonb.JsonException;
import io.avaje.jsonb.Jsonb;
import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.io.InputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

/**
 * 消息处理器.
 *
 * @author KK (kzou227@qq.com)
 */
@Singleton
public class WebsocketHandler {

    private static final Logger log = LogManager.getLogger();

    @Inject
    SessionManager sessionManager;

    @Inject
    SessionGroupManager sessionGroupManager;

    @Inject
    AuthHandler authHandler;

    @Inject
    PacketHandler packetHandler;

    @Inject
    ErrorHandler errorHandler;

    // JSON序列化
    private static final Jsonb JSONB = Jsonb.builder().build();

    /**
     * @param inbound
     * @param outbound
     * @return
     */
    public Mono<Void> handle(WebsocketInbound inbound, WebsocketOutbound outbound) {
        if (log.isInfoEnabled()) {
            inbound.withConnection(conn -> log.info("客户端发起链接：{}", conn.address()));
        }
        // 1. 会话认证
        // 2. 接收WebSocket消息
        return Mono.defer(() -> authHandler.handle(inbound, outbound))
                .delayUntil(session -> {
                    log.info("将Session添加至会话管理器 {}", session);
                    return sessionManager.add(session);
                })
                .doOnSuccess(session -> receiveFrames(inbound, outbound, session))
                .then(outbound.neverComplete())
                .onErrorResume(t -> {
                    log.error("未处理的异常", t);
                    return outbound.sendClose();
                });
    }

    void receiveFrames(WebsocketInbound inbound, WebsocketOutbound outbound, Session session) {
        Flux.defer(() -> inbound.aggregateFrames().receiveFrames())
                .doFinally(signalType -> {
                    log.info("会话中止 signType={} {}", signalType, session);
                    // 删除会话管理器中的 Session
                    sessionManager.remove(session).subscribe();
                    // 删除群组会话管理器中的 Session
                    sessionGroupManager
                            .unsubscribe(session, session.subGroupIds())
                            .subscribe();
                })
                .doOnError(ex -> {
                    log.error("未处理的异常 session={}", session, ex);
                    outbound.sendClose().subscribe();
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(frame -> {
                    // 处理WebSocket消息
                    processFrame(frame, session);
                });
    }

    void processFrame(WebSocketFrame frame, Session session) {
        Packet packet;
        try (InputStream in = new ByteBufInputStream(frame.content())) {
            packet = JSONB.type(Packet.class).fromJson(in);
        } catch (JsonException e) {
            log.debug("非法的请求包 session={}", session, e);
            var ep = new ErrorPacket().setCode(ErrorCodes.PACKET).setMessage("非法的请求包");
            session.send(ep).subscribe();
            return;
        } catch (IOException e) {
            var ep = new ErrorPacket().setCode(ErrorCodes.PACKET).setMessage("网络读取异常");
            session.send(ep).subscribe();
            return;
        }

        Mono.defer(() -> {
                    log.debug("处理消息包 session={} packet={}", session, packet);
                    return packetHandler.handle(session, packet);
                })
                .onErrorResume(t -> {
                    // 错误处理
                    return errorHandler.handle(session, t);
                })
                .subscribe();
    }
}
